package com.qfx.demo.im.service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.qfx.demo.common.vo.MessageBean;

@ServerEndpoint("/im/{userId}")
@Component
public class ImSer {
	private static final Logger LOG = LoggerFactory.getLogger(ImSer.class);
	// 静态变量，用来记录当前在线连接数。应该把它设计成线程安全的。
	private static int onlineCount = 0;
	// 与某个客户端的连接会话，需要通过它来给客户端发送数据
	private Session session;
	// 新：使用map对象，便于根据userId来获取对应的WebSocket
	private static Map<String, ImSer> websocketMap = new ConcurrentHashMap<>();
	// 接收sid
	private String userId = "";
	// 最后一次连接时间
	private long expireTime = 0;
	// 最大允许间隔时间(毫秒)
	private long ttlExp = 15000 * 100;

	/**
	 * 连接建立成功调用的方法
	 */
	@OnOpen
	public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        //判断集合中是否存在当前用户
        if (!websocketMap.containsKey(userId)) {
        	LOG.info("websocketMap->" + JSON.toJSONString(websocketMap));
            websocketMap.put(userId, this);
            addOnlineCount();           //在线数加1
        }
        LOG.info("有新窗口开始监听:" + userId + ",当前在线人数为" + getOnlineCount());
        this.userId = userId;
        
        sendMessage(JSON.toJSONString(new MessageBean("连接成功")));
        // 设置用户连接状态最大有效期
        //方式一
//        toolRedis.set(ToolStaticInfo.WEBSOCKET + this.userId, new Date().getTime(), ToolStaticInfo.WEBSOCKET_TTL_EXP);
        //方式二
        this.expireTime = new Date().getTime();
        
        // TODO 用户上线,获取所有发给当前用户的信息,发送给当前用户
	}

	/**
	 * 连接关闭调用的方法
	 */
	@OnClose
	public void onClose() {
		if (websocketMap.get(this.userId) != null) {
			websocketMap.remove(this.userId);
			subOnlineCount(); // 在线数减1
			LOG.info("有一连接关闭！当前在线人数为" + getOnlineCount());
		}
	}

	/**
	 * 收到客户端消息后调用的方法
	 *
	 * @param message 客户端发送过来的消息
	 */
	@OnMessage
	public void onMessage(String message, Session session) {
		LOG.info("收到来自窗口" + userId + "的信息:" + message);
		if (StringUtils.isNotBlank(message)) {
			try {
				// 1. 解析接收到的报文
				JSONObject messageJson = JSONObject.parseObject(message);
				String messageType = messageJson.getString("messageType");	// 心跳检测heartbeat,发送信息resave
				if ("heartbeat".equals(messageType)) {
//					long ttlExp = toolRedis.getExpire(ToolStaticInfo.WEBSOCKET + this.userId);
					long expire = new Date().getTime() - this.expireTime;
					if (expire > ttlExp) {
						//2. 心跳检测,设置有效时间为15秒
//						toolRedis.expire(ToolStaticInfo.WEBSOCKET + this.userId, ToolStaticInfo.WEBSOCKET_TTL_EXP);
						this.expireTime = new Date().getTime();
					}
				} else {
					// 解析发送的报文
					String toUserId = messageJson.getString("toUserId");
					String contentText = messageJson.getString("contentText");
					messageJson.put("fromUserId", this.userId);
					// 传送给对应用户的websocket
					if (StringUtils.isNotBlank(toUserId) && StringUtils.isNotBlank(contentText)) {
						ImSer socketx = websocketMap.get(toUserId);
						// 需要进行转换，userId
						if (socketx != null) {
							// 放入List中
							List<JSONObject> chatList = new ArrayList<JSONObject>();
							chatList.add(messageJson);
							
							socketx.sendMessage(JSON.toJSONString(chatList));
							// 此处可以放置相关业务代码，例如存储到数据库
						} else {
							// TODO 用户不在线,存入数据库,等待上线执行
						}
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 *
	 * @param session
	 * @param error
	 */
	@OnError
	public void onError(Session session, Throwable error) {
		LOG.error("发生错误");
		error.printStackTrace();
	}

	/**
	 * 实现服务器主动推送
	 */
	public void sendMessage(String message) {
		try {
			this.session.getBasicRemote().sendText(message);
		} catch (IOException e) {
			LOG.error("websocket发送消息[{}]发生异常[{}]", message, e.getMessage());
		}
	}

	/**
	 * 群发自定义消息
	 */
	
	public static void sendInfo(String message, @PathParam("userId") String userId) {
		LOG.info("推送消息到窗口" + userId + "，推送内容:" + message);
		if (null == userId || "".equals(userId)) {
			LOG.info("群推消息,推送内容:" + message);
			// 群发
			for (String key : websocketMap.keySet()) {
				ImSer socketx = websocketMap.get(key);
				if (null != socketx) {
					socketx.sendMessage(message);
				}
			}
		} else {
			LOG.info("推送消息到窗口" + userId + ",推送内容:" + message);
			// 指定发送
			ImSer socketx = websocketMap.get(userId);
			if (null != socketx) {
				socketx.sendMessage(message);
			}
		}
	}
	
	/**
	 * <h5>功能:发送心跳,使用一个task定时调用即可</h5>
	 * 心跳检测机制:
	 * 	1.服务端
	 * 		1.1客户端定时发送心跳检测的信息到服务端,服务端更新当前用户的有效期
	 * 		1.2服务端定时检测所有用户的有效期是否超过指定时间,超过则断开连接,否则发送心跳信息到客户端
	 * 	2.客户端
	 * 		2.1检测机制与服务端相同
	 * @param message
	 * @throws IOException 
	 */
	public static void sendHeartbeat(String message) {

		LOG.info("websocket心跳检测:" + message);
		// 群发
		for (String key : websocketMap.keySet()) {
			// 方式一,从redis中获取有效时间
//			long expire = toolRedis.getExpire(ToolStaticInfo.WEBSOCKET + key);
//			ImSer socketx = websocketMap.get(key);
//			if (expire > 0 && null != socketx) {
//				System.out.println(socketx.userId);
//				socketx.sendMessage(message);
//			} else {
//				websocketMap.remove(key);
//				subOnlineCount(); // 在线数减1
//				LOG.info("用户[{}]断开连接！当前在线人数为{}", key , getOnlineCount());
//			}
			// 方式二,从websocket对象中对比
			ImSer socketx = websocketMap.get(key);
			boolean falg = false;
			if (null != socketx) {
				long expire = new Date().getTime() - socketx.expireTime;
				if (expire > socketx.ttlExp) {
					System.out.println(socketx.userId);
					socketx.sendMessage(message);
					falg = true;
				}
			}
			if (falg == false) {
				websocketMap.remove(key);
				subOnlineCount(); // 在线数减1
				LOG.info("用户[{}]断开连接！当前在线人数为{}", key , getOnlineCount());
			}
		}
	}
	
	/**
	 * <h5>功能:指定用户强制退出</h5>
	 *
	 * @param userId
	 * @throws IOException
	 */
	public static void quit(String userId) {
		if (websocketMap.get(userId) != null) {
			sendInfo(JSON.toJSONString(new MessageBean("当前连接已被关闭!")), userId);
			
			websocketMap.remove(userId);
			subOnlineCount(); // 在线数减1
			LOG.info("有一连接关闭！当前在线人数为" + getOnlineCount());
		}
	}

	public static synchronized int getOnlineCount() {
		return onlineCount;
	}

	public static synchronized void addOnlineCount() {
		ImSer.onlineCount++;
	}

	public static synchronized void subOnlineCount() {
		ImSer.onlineCount--;
	}
}
